#!/usr/bin/env python3
#
# SPDX-License-Identifier: GPL-2.0
#
# Copyright (c) 2013-2023 Igor Pecovnik, igor@armbian.com
#
# This file is a part of the Armbian Build Framework
# https://github.com/armbian/build/
#
import email.utils
import logging
import mailbox
import os
import re
import subprocess
import tempfile

import git  # GitPython
from unidecode import unidecode
from unidiff import PatchSet

from common.patching_config import PatchingConfig
from common.term_colors import background_dark_or_light

MAGIC_MBOX_MARKER_STANDARD = "Mon Sep 17 00:00:00 2001"
MAGIC_MBOX_MARKER_B4 = "git@z Thu Jan  1 00:00:00 1970"

REGEX_PATCH_FILENAMES = r"^patching file \"(.+)\""
log: logging.Logger = logging.getLogger("patching_utils")

# Magic strings and regex for rewriting patches' "index xxx....yyyy" lines
index_zero = f"{'0' * 12}"
index_from_zero = f"index {'0' * 12}..{'1' * 12}"
index_not_zero = f"index {'1' * 12}..{'2' * 12}"
index_rewrite_regexp: re.Pattern = re.compile(r"index ([0-9a-f]{12})\.\.([0-9a-f]{12})")


# Callback used for rewriting index lines.
def rewrite_indexes_callback(x: re.Match):  # Preserve zero from's for new file creations.
	if x.group(1) == index_zero:
		return index_from_zero
	return index_not_zero


class PatchRootDir:
	def __init__(self, abs_dir, root_type, patch_type, root_dir):
		self.abs_dir = abs_dir
		self.root_type = root_type
		self.patch_type = patch_type
		self.root_dir = root_dir


class PatchSubDir:
	def __init__(self, rel_dir, sub_type):
		self.rel_dir = rel_dir
		self.sub_type = sub_type


class PatchDir:
	def __init__(self, patch_root_dir: PatchRootDir, patch_sub_dir: PatchSubDir, abs_root_dir: str):
		self.patch_root_dir: PatchRootDir = patch_root_dir
		self.patch_sub_dir: PatchSubDir = patch_sub_dir
		self.full_dir = os.path.realpath(os.path.join(self.patch_root_dir.abs_dir, self.patch_sub_dir.rel_dir))
		self.rel_dir = os.path.relpath(self.full_dir, abs_root_dir)
		self.root_type = self.patch_root_dir.root_type
		self.sub_type = self.patch_sub_dir.sub_type
		self.patch_files: list[PatchFileInDir] = []
		self.is_autogen_dir: bool = False

	def __str__(self) -> str:
		return "<PatchDir: full_dir:'" + str(self.full_dir) + "'>"

	def find_series_patch_files(self) -> list["PatchFileInDir"]:
		# do nothing if the self.full_path is not a real, existing, directory
		if not os.path.isdir(self.full_dir):
			return []

		# If the directory contains a series.conf file.
		series_patches: list[PatchFileInDir] = []
		series_conf_path = os.path.join(self.full_dir, "series.conf")
		if os.path.isfile(series_conf_path):
			counter = 0
			patches_in_series = self.parse_series_conf(series_conf_path)
			for patch_file_name in patches_in_series:
				patch_file_path = os.path.join(self.full_dir, patch_file_name)
				if os.path.isfile(patch_file_path):
					counter += 1
					patch_file = PatchFileInDir(patch_file_path, self)
					patch_file.from_series = True
					patch_file.series_counter = counter
					# Fix basename for patches in series.conf
					relative_path = os.path.relpath(patch_file_path, self.full_dir)
					patch_file.relative_dirs_and_base_file_name = os.path.splitext(relative_path)[0]
					series_patches.append(patch_file)
				else:
					raise Exception(
						f"series.conf file {series_conf_path} contains a patch file {patch_file_name} that does not exist")
		return series_patches

	def find_files_patch_files(self) -> list["PatchFileInDir"]:
		# do nothing if the self.full_path is not a real, existing, directory
		if not os.path.isdir(self.full_dir):
			return []

		# Find the files in self.full_dir that end in .patch; do not consider subdirectories.
		# Add them to self.patch_files.
		for file in os.listdir(self.full_dir):
			# noinspection PyTypeChecker
			if file.endswith(".patch"):
				self.patch_files.append(PatchFileInDir(file, self))
		return self.patch_files

	@staticmethod
	def parse_series_conf(series_conf_path):
		patches_in_series = []
		with open(series_conf_path, "r") as series_conf_file:
			for line in series_conf_file:
				line = line.strip()
				if line.startswith("#"):
					continue
				# if line begins with "-", skip it
				if line.startswith("-"):
					continue
				if line == "":
					continue
				patches_in_series.append(line)
		return patches_in_series


class PatchFileInDir:
	def __init__(self, file_name, patch_dir: PatchDir):
		self.file_name = file_name
		self.patch_dir: PatchDir = patch_dir
		self.relative_dirs_and_base_file_name = os.path.splitext(self.file_name)[0]
		self.file_name_no_ext_no_dirs = os.path.basename(self.relative_dirs_and_base_file_name)
		self.from_series = False
		self.series_counter = None
		self.multiple_patches_in_file = False

	def __str__(self) -> str:
		desc: str = f"<PatchFileInDir: file_name:'{self.file_name}', dir:{self.patch_dir.__str__()} >"
		return desc

	def full_file_path(self):
		return os.path.join(self.patch_dir.full_dir, self.file_name)

	def relative_to_src_filepath(self):
		return os.path.join(self.patch_dir.rel_dir, self.file_name)

	def split_patches_from_file(self) -> list["PatchInPatchFile"]:
		# Hack: for autogen dirs, we just need to be as fast as possible, don't parse anything.
		if self.patch_dir.is_autogen_dir:
			contents_bytes = read_file_as_bytes(self.full_file_path())
			# @TODO: date?
			bare_patch = PatchInPatchFile(
				self, 1, "", f"Autogenerated patch",
				f"Armbian Autopatcher <auto.patch@armbian.com>",
				f"[AUTOGEN] {self.relative_dirs_and_base_file_name}", None)
			bare_patch.diff_bytes = contents_bytes
			log.debug(f"Patch file {self.full_file_path()} is autogenerated.")
			return [bare_patch]

		counter: int = 1
		mbox: mailbox.mbox = mailbox.mbox(self.full_file_path())
		is_invalid_mbox: bool = False

		# Sanity check: if the file is understood as mailbox, make sure the first line is a valid "From " line,
		# and has the magic marker 'Mon Sep 17 00:00:00 2001' in it; otherwise, it could be a combined
		# bare patch + mbox-formatted patch in a single file, and we'd lose the bare patch.
		if len(mbox) > 0:
			contents, contents_read_problems = read_file_as_utf8(self.full_file_path())
			first_line = contents.splitlines()[0].strip()
			if not first_line.startswith("From ") or ((MAGIC_MBOX_MARKER_STANDARD not in first_line) and (MAGIC_MBOX_MARKER_B4 not in first_line)):
				# is_invalid_mbox = True # we might try to recover from this is there's too many
				# log.error(
				raise Exception(
					f"File {self.full_file_path()} seems to be a valid mbox file, but it begins with"
					f" '{first_line}', but in mbox the 1st line should be a valid From: header"
					f" with the magic date.")
			# Obtain how many times the magic marker date string is present in the contents
			magic_marker_count = contents.count(MAGIC_MBOX_MARKER_STANDARD) + contents.count(MAGIC_MBOX_MARKER_B4)
			if magic_marker_count != len(mbox):
				# is_invalid_mbox = True # we might try to recover from this is there's too many
				# log.error(
				raise Exception(
					f"File {self.full_file_path()} seems to be a valid mbox file, but it contains"
					f" {magic_marker_count} magic marker dates, while the mbox file has been parsed as"
					f" {len(mbox)} patches. Check the file for mbox formatting errors.")

		# if there is no emails, it's a diff-only patch file.
		if is_invalid_mbox or len(mbox) == 0:
			# read the file into a string; explicitly use utf-8 to not depend on the system locale
			diff, read_problems = read_file_as_utf8(self.full_file_path())
			bare_patch = PatchInPatchFile(self, counter, diff, None, None, None, None)
			bare_patch.problems.append("not_mbox")
			bare_patch.problems.extend(read_problems)
			log.debug(f"Patch file {self.full_file_path()} is not properly mbox-formatted.")
			return [bare_patch]

		# loop over the emails in the mbox
		patches: list[PatchInPatchFile] = []
		msg: mailbox.mboxMessage
		for msg in mbox:
			problems: list[str] = []
			patch: str
			try:
				patch = msg.get_payload(decode=True).decode("utf-8")
			except UnicodeDecodeError as e:
				log.warning(f"Invalid UTF-8 mbox payload in file {self.full_file_path()}(:{counter}): {e}")
				problems.append("invalid_utf8_mbox")
				patch = msg.get_payload()  # this will mangle valid utf-8; go ahead and use that anyway

			# split the patch itself and the description from the payload
			desc, patch_contents = self.split_description_and_patch(patch)
			if len(patch_contents) == 0:
				log.warning(f"WARNING: patch file {self.full_file_path()} fragment {counter} contains an empty patch")
				continue

			# Sanity check: if the patch_contents contains the magic marker, something is _very_ wrong, and we're gonna eat a patch.
			if (MAGIC_MBOX_MARKER_STANDARD in patch_contents) or (MAGIC_MBOX_MARKER_B4 in patch_contents):
				raise Exception(
					f"File {self.full_file_path()} fragment {counter} seems to be a valid mbox file, but it contains"
					f" the magic date in the patch contents, shouldn't happen. Check the mbox formatting.")

			patches.append(PatchInPatchFile(
				self, counter, patch_contents, desc, msg['From'], msg['Subject'], msg['Date']))

			counter += 1

		# sanity check, throw exception if there are no patches
		if len(patches) == 0:
			raise Exception("No valid patches found in file " + self.full_file_path())

		if (len(patches) > 1):
			self.multiple_patches_in_file = True

		return patches

	@staticmethod
	def split_description_and_patch(full_message_text: str) -> tuple["str | None", str]:
		separator = "\n---\n"
		# check if the separator is in the patch, if so, split
		if separator in full_message_text:
			# find the _last_ occurrence of the separator, and split two chunks from that position
			separator_pos = full_message_text.rfind(separator)
			desc = full_message_text[:separator_pos]
			patch = full_message_text[separator_pos + len(separator):]
			return desc, patch
		else:  # no separator, so no description, patch is the full message
			desc = None
			patch = full_message_text
		return desc, patch

	def rewrite_patch_file(self, patches: list["PatchInPatchFile"]):
		# Produce a mailbox file from the patches.
		# The patches are assumed to be in the same order as they were in the original file.
		# The original file is overwritten.
		output_file = self.full_file_path()
		log.info(f"Rewriting {output_file} with new patches...")
		with open(output_file, "w") as f:
			for patch in patches:
				if patch.parent.patch_dir.is_autogen_dir:
					log.debug(f"Skipping autogen patch {patch.counter} in file {output_file}...")
				else:
					log.info(f"Writing patch {patch.counter} to {output_file}...")
					f.write(patch.rewritten_patch)


# Placeholder for future manual work
def shorten_patched_file_name_for_stats(path):
	return os.path.basename(path)


def parse_patch_stdout_for_files(stdout_output: str):
	# run the REGEX_PATCH_FILENAMES on the output; get the group 1 (the filename) for each match
	# log.debug(f"Running regex on {stdout_output}")
	ret: list[str] = re.findall(REGEX_PATCH_FILENAMES, stdout_output, re.MULTILINE)
	# log.debug(f"Found {len(ret)} patched files in patch output: {','.join(ret)}.")
	return ret  # warning: this includes files the patch *deleted* too


class PatchInPatchFile:

	def __init__(self, parent: PatchFileInDir, counter: int, diff: str, desc, from_hdr, sbj_hdr, date_hdr):
		self.problems: list[str] = []
		self.applied_ok: bool = False
		self.rewritten_patch: str | None = None
		self.git_commit_hash: str | None = None
		self.actually_patched_files: list[str] = []

		self.parent: PatchFileInDir = parent
		self.counter: int = counter
		self.diff: str | None = diff
		self.diff_bytes: bytes | None = None

		self.failed_to_parse: bool = False

		# Basic parsing of properly mbox-formatted patches
		self.desc: str = downgrade_to_ascii(desc) if desc is not None else None
		self.from_name, self.from_email = self.parse_from_name_email(from_hdr) if from_hdr is not None else (
			None, None)
		self.subject: str = downgrade_to_ascii(fix_patch_subject(sbj_hdr)) if sbj_hdr is not None else None
		self.date = email.utils.parsedate_to_datetime(date_hdr) if date_hdr is not None else None

		self.patched_file_stats_dict: dict = {}
		self.total_additions: int = 0
		self.total_deletions: int = 0
		self.files_modified: int = 0
		self.files_added: int = 0
		self.files_renamed: int = 0
		self.files_removed: int = 0
		self.created_file_names = []
		self.deleted_file_names = []
		self.renamed_file_names_source = []  # The original file names of renamed files
		self.all_file_names_touched = []
		self.rejects: str | None = None
		self.patch_output: str | None = None

	def parse_from_name_email(self, from_str: str) -> tuple["str | None", "str | None"]:
		m = re.match(r'(?P<name>.*)\s*<\s*(?P<email>.*)\s*>', from_str)
		if m is None:
			self.problems.append("invalid_author")
			log.debug(
				f"Failed to parse name and email from: '{from_str}' while parsing patch {self.counter} in file {self.parent.full_file_path()}")
			return downgrade_to_ascii(remove_quotes(from_str)), "unknown-email@domain.tld"
		else:
			# Return the name and email
			return downgrade_to_ascii(remove_quotes(m.group("name"))), remove_quotes(m.group("email"))

	def one_line_patch_stats(self) -> str:
		if (not self.failed_to_parse) and (not self.parent.patch_dir.is_autogen_dir):
			files_desc = ", ".join(self.patched_file_stats_dict)
			return f"{self.text_diffstats()} {{{files_desc}}}"
		return ""

	def text_diffstats(self) -> str:
		operations: list[str] = []
		operations.append(f"{self.files_modified}M") if self.files_modified > 0 else None
		operations.append(f"{self.files_added}A") if self.files_added > 0 else None
		operations.append(f"{self.files_removed}D") if self.files_removed > 0 else None
		operations.append(f"{self.files_renamed}R") if self.files_renamed > 0 else None
		return f"(+{self.total_additions}/-{self.total_deletions})[{', '.join(operations)}]"

	def parse_patch(self):
		# Hack: don't parse if autogenned; this could also be "don't parse if larger than X megabytes" since
		# large patches cause trouble
		if self.parent.patch_dir.is_autogen_dir:
			log.debug(
				f"Skipping parsing of auto-generated patch {self.counter} in file {self.parent.full_file_path()}")
			return
		else:
			# parse the patch, using the unidiff package
			try:
				patch = PatchSet(self.diff, encoding=None)
			except Exception as e:
				self.problems.append("invalid_diff")
				self.failed_to_parse = True
				log.warning(f"Failed to parse unidiff for file {self.parent.full_file_path()}(:{self.counter}): '{str(e).strip()}'")
				return  # no point in continuing; the patch is invalid; might be recovered during apply

		self.total_additions = 0
		self.total_deletions = 0
		self.files_renamed = 0
		self.files_modified = len(patch.modified_files)
		self.files_added = len(patch.added_files)
		self.files_removed = len(patch.removed_files)
		self.created_file_names = [f.path for f in patch.added_files]
		self.deleted_file_names = [f.path for f in patch.removed_files]
		self.renamed_file_names_source = []  # computed below
		self.all_file_names_touched = \
			[f.path for f in patch.added_files] + \
			[f.path for f in patch.modified_files] + \
			[f.path for f in patch.removed_files]
		self.patched_file_stats_dict = {}
		for f in patch:
			if not f.is_binary_file:
				self.total_additions += f.added
				self.total_deletions += f.removed
				self.patched_file_stats_dict[shorten_patched_file_name_for_stats(f.path)] = {
					"abs_changed_lines": f.added + f.removed}
			self.files_renamed = self.files_renamed + 1 if f.is_rename else self.files_renamed
			if f.is_rename:
				sans_prefix = f.source_file[2:] if f.source_file.startswith("a/") else f.source_file
				self.renamed_file_names_source.append(sans_prefix)
		# sort the self.patched_file_stats_dict by the abs_changed_lines, descending
		self.patched_file_stats_dict = dict(sorted(
			self.patched_file_stats_dict.items(),
			key=lambda item: item[1]["abs_changed_lines"],
			reverse=True))
		# sanity check; if all the values are zeroes, throw an exception
		if self.total_additions == 0 and self.total_deletions == 0 and \
			self.files_modified == 0 and self.files_added == 0 and self.files_removed == 0:
			self.problems.append("diff_has_no_changes")
			raise Exception(
				f"Patch file {self.parent.full_file_path()} has no changes. diff is {len(self.diff)} bytes: '{self.diff}'")

	def __str__(self) -> str:
		return self.str_oneline_around("->", "<-")

	def str_oneline_around(self, prefix, suffix):
		# extra_email = f"{self.from_email}" if self.from_email is not None else ""
		# extra_subject = f":'{self.subject}'" if self.subject is not None else ""
		# extra_author = f":{extra_email}{extra_subject}"
		desc: str = \
			f"{prefix}{self.parent.relative_dirs_and_base_file_name}(:{self.counter}) " + \
			f"{self.one_line_patch_stats()}{suffix}"
		return desc

	def apply_patch(self, working_dir: str, options: dict[str, bool]):
		# Sanity check: if patch would create files, make sure they don't exist to begin with.
		# This avoids patches being able to overwrite the mainline.
		for would_be_created_file in self.created_file_names:
			full_path = os.path.join(working_dir, would_be_created_file)
			if os.path.exists(full_path):
				self.problems.append("overwrites")
				if options["allow_recreate_existing_files"]:
					log.debug(f"Tolerating recreation of {would_be_created_file} in {self} as instructed.")
					os.remove(full_path)
				else:
					log.warning(
						f"File {would_be_created_file} already exists, but patch {self} would re-create it.")

		# Use the 'patch' utility to apply the patch.
		if self.diff_bytes is None:
			real_input = self.diff.encode("utf-8")
		else:
			real_input = self.diff_bytes

		# create a temporary filename (don't create the file yet: patch will maybe create it)
		rejects_file = tempfile.mktemp()
		# log.debug(f"Rejects file is going to be '{rejects_file}'...")

		proc = subprocess.run(
			["patch", "--batch", "-p1", "-N", f"--reject-file={rejects_file}", "--quoting-style=c"],
			cwd=working_dir,
			input=real_input,
			stdout=subprocess.PIPE,
			stderr=subprocess.PIPE,
			check=False)
		# read the output of the patch command
		stdout_output = proc.stdout.decode("utf-8").strip()
		stderr_output = proc.stderr.decode("utf-8").strip()
		# if stdout_output != "":
		# log.debug(f"patch stdout: {stdout_output}")
		# if stderr_output != "":
		# log.warning(f"patch stderr: {stderr_output}")

		# Check if the rejects exists:
		if os.path.exists(rejects_file):
			log.debug(f"Rejects file {rejects_file} exists.")
			# Show its contents
			with open(rejects_file, "r") as f:
				reject_contents = f.read()
				self.rejects = reject_contents
			# delete it
			os.remove(rejects_file)

		# Look at stdout. If it contains:
		if " (offset" in stdout_output or " with fuzz " in stdout_output:
			log.debug(f"Patch {self} needs rebase: offset/fuzz used during apply.")
			self.problems.append("needs_rebase")

		if "can't find file to patch at input line" in stdout_output:
			log.warning(f"Patch {self} needs fixing: can't find file to patch.")
			self.problems.append("missing_file")

		# parse the stdout output for the files actually patched.
		if options["set_patch_date"]:
			self.actually_patched_files = parse_patch_stdout_for_files(stdout_output)
			self.apply_patch_date_to_files(working_dir, options)

		# Store the stdout and stderr output
		patch_output = ""
		patch_output += f"{stdout_output}\n" if stdout_output != "" else ""
		patch_output += f"{stderr_output}\n" if stderr_output != "" else ""
		self.patch_output = f"{patch_output}"

		# Check if the exit code is not zero and bomb
		if proc.returncode != 0:
			self.problems.append("failed_apply")
			raise Exception(f"Failed to apply patch {self.parent.full_file_path()}")

	def commit_changes_to_git(self, repo: git.Repo, add_rebase_tags: bool, split_patches: bool, pconfig: PatchingConfig):
		log.info(f"Committing changes to git: {self.parent.relative_dirs_and_base_file_name}")
		# add all the files that were touched by the patch
		# if the patch failed to parse, this will be an empty list, so we'll just add all changes.
		add_all_changes_in_git = False
		if (not self.failed_to_parse) and (not self.parent.patch_dir.is_autogen_dir):
			# sanity check.
			if len(self.all_file_names_touched) == 0:
				raise Exception(
					f"Patch {self} has no files touched, but is not marked as failed to parse.")
			# add all files to git staging area
			all_files_to_add: list[str] = []
			for file_name in self.all_file_names_touched:
				is_delete = False
				# Check if deleted, don't complain if so.
				if file_name in self.deleted_file_names:
					is_delete = True

				log.info(f"Adding file {file_name} to git")
				full_path = os.path.join(repo.working_tree_dir, file_name)
				if (not os.path.exists(full_path)) and (not is_delete):
					self.problems.append("wrong_strip_level")
					log.error(f"File '{full_path}' does not exist, but is touched by {self}")
					add_all_changes_in_git = True
				else:
					all_files_to_add.append(file_name)

			# Also add all source (pre-rename) files that were renamed, sans-checking, since they won't exist.
			for file_name in self.renamed_file_names_source:
				# But, the file has to be at least inside the repo; it's not a real rename if it's outside (eg: bad /dev/null patches)
				if file_name.startswith("/"):
					log.warning(f"File {file_name} claims to be a renamed source file, but is outside the repo.")
					continue
				log.info(f"Adding file {file_name} (rename/source) to git")
				all_files_to_add.append(file_name)

			if split_patches:
				return self.commit_changes_to_git_grouped(all_files_to_add, repo)

			if not add_all_changes_in_git:
				log.debug(f"Adding (pre-config ) {len(all_files_to_add)} files to git: {' '.join(all_files_to_add)}")
				do_not_commit_files = pconfig.patches_to_git_config.do_not_commit_files  # from config
				do_not_commit_regexes = pconfig.patches_to_git_config.do_not_commit_regexes  # from config
				final_files_to_add = [f for f in all_files_to_add if f not in do_not_commit_files]
				final_files_to_add = [f for f in final_files_to_add if not any(re.match(r, f) for r in do_not_commit_regexes)]
				log.debug(f"Adding (post-config) {len(final_files_to_add)} files to git: {' '.join(final_files_to_add)}")
				if len(final_files_to_add) == 0:
					log.warning(f"There are 0 files to commit post-config. The whole patch should be removed.")
					self.problems.append("no_files_to_commit_after_config")
					return None
				repo.git.add("-f", final_files_to_add)

		if self.failed_to_parse or self.parent.patch_dir.is_autogen_dir or add_all_changes_in_git:
			log.warning(f"Rescue: adding all changed files to git for {self}")
			repo.git.add(repo.working_tree_dir)

		desc_no_none = self.desc if self.desc is not None else ""
		commit_message = f"{self.subject}\n\n{desc_no_none}"

		if add_rebase_tags:
			commit_message = f"{self.parent.relative_dirs_and_base_file_name}(:{self.counter})\n\nOriginal-Subject: {self.subject}\n{self.desc}"
			commit_message = f"{commit_message}\n{self.patch_rebase_tags_desc()}"

		author: git.Actor = git.Actor(self.from_name, self.from_email)
		committer: git.Actor = git.Actor("Armbian AutoPatcher", "patching@armbian.com")
		commit = repo.index.commit(
			message=commit_message,
			author=author,
			committer=committer,
			author_date=self.date,
			commit_date=self.date,
			skip_hooks=True
		)
		log.info(f"Committed changes to git: {commit.hexsha}")
		# Make sure the commit is not empty
		if commit.stats.total["files"] == 0:
			self.problems.append("empty_commit")
			raise Exception(
				f"Commit {commit.hexsha} ended up empty; source patch is {self} at {self.parent.full_file_path()}(:{self.counter})")
		return {"commit_hash": commit.hexsha, "patch": self}

	def commit_changes_to_git_grouped(self, all_files_to_add: list[str], repo: git.Repo):
		all_commits = []
		prefix = "Feiteng "
		grouped_files = {}
		# group files by directory
		for file_name in all_files_to_add:
			dir_name = os.path.dirname(file_name)
			if dir_name not in grouped_files:
				grouped_files[dir_name] = []
			grouped_files[dir_name].append(file_name)

		for group_name, files in grouped_files.items():
			for one_file in files:
				repo.git.add(one_file)

			commit_message = f"{prefix}{group_name}\n\n{prefix}{group_name}"
			author: git.Actor = git.Actor("Ricardo Pardini", "ricardo@pardini.net")
			commit = repo.index.commit(
				message=commit_message,
				author=author,
				committer=author,
				author_date=self.date,
				commit_date=self.date,
				skip_hooks=True
			)
			log.info(f"Committed changes to git: {commit.hexsha}")
			# Make sure the commit is not empty
			if commit.stats.total["files"] == 0:
				self.problems.append("empty_commit")
				raise Exception(
					f"Commit {commit.hexsha} ended up empty; source patch is {self} at {self.parent.full_file_path()}(:{self.counter})")
			all_commits.append({"commit_hash": commit.hexsha, "patch": self})
		return all_commits

	def patch_rebase_tags_desc(self):
		tags = {}
		tags["Patch-File"] = self.parent.relative_dirs_and_base_file_name
		tags["Patch-File-Counter"] = self.counter
		tags["Patch-Rel-Directory"] = self.parent.patch_dir.rel_dir
		tags["Patch-Type"] = self.parent.patch_dir.patch_root_dir.patch_type
		tags["Patch-Root-Type"] = self.parent.patch_dir.root_type
		tags["Patch-Sub-Type"] = self.parent.patch_dir.sub_type
		if self.subject is not None:
			tags["Original-Subject"] = self.subject
		ret = ""
		for k, v in tags.items():
			ret += f"X-Armbian: {k}: {v}\n"
		return ret

	def markdown_applied(self):
		if self.applied_ok:
			return "✅"
		return "❌"

	def markdown_problems(self):
		ret = []
		# if it's a patch in a series, add emoji
		if self.parent.from_series:
			ret.append(f" 📜 ")

		if self.parent.patch_dir.is_autogen_dir:
			ret.append(f" 🤖 ")

		if len(self.problems) == 0:
			ret.append("✅ ")

		for problem in self.problems:
			if problem in ["not_mbox", "needs_rebase"]:
				# warning emoji
				ret.append(f"⚠️`[{problem}]` ")
			elif problem in ["autogen"]:
				ret.append(f"ℹ️`[{problem}]` ")
			else:
				ret.append(f"❌`[{problem}]` ")

		# if it's a user patch, add smiley
		if self.parent.patch_dir.patch_root_dir.root_type == "user":
			ret.append(" 🫠`[user]` ")

		return " ".join(ret)

	def markdown_diffstat(self):
		return f"`{self.text_diffstats()}`"

	def text_files(self):
		ret = []
		max_files_shown = 15
		file_names = list(self.patched_file_stats_dict.keys())
		if len(file_names) == 0:
			return "?"
		for file_name in file_names[:max_files_shown]:
			ret.append(f"{file_name}")
		if len(file_names) > max_files_shown:
			ret.append(f"and {len(file_names) - max_files_shown} more")
		return ", ".join(ret)

	def markdown_files(self):
		ret = []
		max_files_shown = 15
		# Use the keys of the patch_file_stats_dict which is already sorted by the larger files
		file_names = list(self.patched_file_stats_dict.keys())
		# if no files were touched, just return an ?
		if len(file_names) == 0:
			return "`?`"
		for file_name in file_names[:max_files_shown]:
			ret.append(f"`{file_name}`")
		if len(file_names) > max_files_shown:
			ret.append(f"_and {len(file_names) - max_files_shown} more_")
		return ", ".join(ret)

	def text_author(self):
		if self.from_name:
			return f"{self.from_name.strip()}"
		return "[no Author]"

	def markdown_author(self):
		if self.from_name:
			return f"`{self.from_name.strip()}`"
		return "`[no Author]`"

	def markdown_subject(self):
		if self.subject:
			return f"_{self.subject}_"
		return "`[no Subject]`"

	def text_subject(self):
		if self.subject:
			return f"{self.subject}"
		return "[no Subject]"

	def markdown_link_to_patch(self):
		if self.git_commit_hash is None:
			return ""
		return f"{self.git_commit_hash} "

	def markdown_name(self, skip_markdown=False):
		ret = []
		escape = "`" if not skip_markdown else ""
		patch_name = self.parent.relative_dirs_and_base_file_name
		# if the basename includes slashes, split after the last slash, the first part is the directory, second the file
		if "/" in self.parent.relative_dirs_and_base_file_name:
			dir_name, patch_name = self.parent.relative_dirs_and_base_file_name.rsplit("/", 1)
			if dir_name is not None:
				# get only the last part of the dir_name
				dir_name = dir_name.split("/")[-1]
				ret.append(f"{escape}[{dir_name}/]{escape}")
		ret.append(f"{escape}{patch_name}{escape}")
		return " ".join(ret)

	def rich_name_status(self):
		color = "green"
		for problem in self.problems:
			if problem in ["not_mbox", "needs_rebase"]:
				color = "yellow"
			else:
				color = "red"
		bold = 'bold dim' if background_dark_or_light() == 'light' else 'bold'
		# @TODO: once our ansi-haste supports it, use [link url=file://blaaa]
		if self.parent.multiple_patches_in_file:
			return f"[{bold}][{color}]{self.markdown_name(skip_markdown=True)}[/{bold}](:{self.counter})"
		else:
			return f"[{bold} {color}]{self.markdown_name(skip_markdown=True)}"

	def rich_patch_output(self):
		ret = self.patch_output
		color_tags = {
			'green': ['Reversed (or previously applied) patch detected!'],
			'yellow': ['with fuzz', 'offset ', ' hunks ignored', ' hunk ignored'],
			'red': ['hunk FAILED', 'hunks FAILED']
		}
		bold = 'bold dim' if background_dark_or_light() == 'light' else 'bold'
		# use Rich's syntax highlighting to highlight with color
		for color in color_tags:
			for tag in color_tags[color]:
				ret = ret.replace(tag, f"[{bold} {color}]{tag}[/{bold} {color}]")
		return ret

	def apply_patch_date_to_files(self, working_dir, options):
		# The date applied to the patched files is:
		# 1) The date of the root Makefile
		# 2) The date of the patch file
		# And date is 2, unless 1 is higher.
		patch_mtime = os.path.getmtime(self.parent.full_file_path())
		makefile_mtime = options["root_makefile_date"]
		final_mtime = makefile_mtime
		if patch_mtime > makefile_mtime:
			log.debug(f"Patch {self.parent.full_file_path()} is newer than root Makefile, using patch date")
			final_mtime = patch_mtime
		else:
			log.debug(
				f"Root Makefile is newer than patch '{self.parent.full_file_path()}', using Makefile date")
		# Apply the date to all files that were touched by the patch
		# If the patch parsed OK, avoid trying to touch files the patch deleted.
		files_to_touch = self.actually_patched_files
		if not self.failed_to_parse:
			# remove self.deleted_file_names files_to_touch
			files_to_touch = [f for f in files_to_touch if f not in self.deleted_file_names]

		for file_name in files_to_touch:
			# log.debug(f"Setting mtime of '{file_name}' to '{final_mtime}'.")
			file_path = os.path.join(working_dir, file_name)
			try:
				os.utime(file_path, (final_mtime, final_mtime))
			except FileNotFoundError:
				log.warning(f"File '{file_path}' not found in patch {self}, can't set mtime.")


def fix_patch_subject(subject):
	# replace newlines with one space
	subject = re.sub(r"\s+", " ", subject.strip())
	# replace every non-printable character with a space
	subject = re.sub(r"[^\x20-\x7e]", " ", subject)
	# replace two consecutive spaces with one
	subject = re.sub(r" {2}", " ", subject).strip()
	# remove tags from the beginning of the subject
	tags = ['PATCH']
	for tag in tags:
		# subject might begin with "[tag xxxxx]"; remove it
		if subject.startswith(f"[{tag}"):
			subject = subject[subject.find("]") + 1:].strip()
	prefixes = ['FROMLIST(v1): ']
	for prefix in prefixes:
		if subject.startswith(prefix):
			subject = subject[len(prefix):].strip()
	return subject


# This is definitely not the right way to do this, but it works for now.
def prepare_clean_git_tree_for_patching(repo: git.Repo, revision_sha: str, branch_name: str):
	# Let's find the Commit object for the revision_sha
	log.debug("Resetting git tree to revision '%s'", revision_sha)
	commit = repo.commit(revision_sha)
	# Lets checkout, detached HEAD, to that Commit
	repo.head.reference = commit
	repo.head.reset(index=True, working_tree=True)
	# Let's create a new branch, and checkout to it, discarding any existing branch
	log.debug("Creating branch '%s'", branch_name)
	repo.create_head(branch_name, revision_sha, force=True)
	repo.head.reference = repo.heads[branch_name]
	repo.head.reset(index=True, working_tree=True)
	# Let's remove all the untracked, but not ignored, files from the working copy
	for file in repo.untracked_files:
		full_name = os.path.join(repo.working_tree_dir, file)
		log.debug(f"Removing untracked file '{file}'")
		os.remove(full_name)


def export_commit_as_patch(repo: git.Repo, commit: str):
	# Export the commit as a patch
	proc = subprocess.run([
		"git", "format-patch",
		"--unified=3",  # force 3 lines of diff context
		"--keep-subject",  # do not add a prefix to the subject "[PATCH] "
		# "--add-header=Organization: Armbian",  # add a header to the patch (ugly, changes the header)
		"--no-encode-email-headers",  # do not encode email headers
		# "--signature=66666" # add a signature; this does not work and causes patch to not be emitted.
		'--signature', "Armbian",
		'--zero-commit',  # do not use the git revision, instead 000000...0000
		'--stat=120',  # 'wider' stat output; default is 80
		'--stat-graph-width=10',  # shorten the diffgraph graph part, it's too long
		'--abbrev=12',  # force index length to 12 - essential for the regex below to work
		"-1", "--stdout", commit
	],
		cwd=repo.working_tree_dir,
		stdout=subprocess.PIPE,
		stderr=subprocess.PIPE,
		check=False)
	# read the output of the patch command
	stdout_output = proc.stdout.decode("utf-8")
	stderr_output = proc.stderr.decode("utf-8")
	# Check if the exit code is not zero and bomb
	if proc.returncode != 0:
		raise Exception(f"Failed to export commit {commit} to patch: {stderr_output}")
	if stdout_output == "":
		raise Exception(f"Failed to export commit {commit} to patch: no output")

	# Now, massage the output. We don't want the "index 08c33ec7e9f1..528741fcc0ec 100644" lines changing every time.
	# We do need to preserve "0000000000.." ones as that indicates new file creation.
	# Use a regular expression and a callback to decide. Check the top of this file for the regex and callback.
	rewritten_indexes = re.sub(index_rewrite_regexp, rewrite_indexes_callback, stdout_output)

	# If rewritten is same as original this didn't work, surely.
	if rewritten_indexes == stdout_output:
		raise Exception(f"Failed to rewrite indexes in patch output: {stdout_output}")

	return rewritten_indexes


# Hack
def downgrade_to_ascii(utf8: str) -> str:
	return unidecode(utf8)


def remove_quotes(utf8: str) -> str:
	return utf8.replace('"', '')


# Try hard to read a possibly invalid utf-8 file
def read_file_as_utf8(file_name: str) -> tuple[str, list[str]]:
	with open(file_name, "rb") as f:
		content = f.read()  # Read the file as bytes
		try:
			return content.decode("utf-8"), []  # no problems if this worked
		except UnicodeDecodeError as ude:
			log.warning(f"File '{file_name}' is not valid utf-8, trying to fix it...: '{ude}'")
			# If decoding failed, try to decode as iso-8859-1 # @TODO: or big5?
			return content.decode("iso-8859-1"), ["invalid_utf8"]  # utf-8 problems


def read_file_as_bytes(file_name: str) -> bytes:
	with open(file_name, "rb") as f:
		return f.read()  # Read the file as bytes


# Extremely Armbian-specific.
def perform_git_archeology(
	base_armbian_src_dir: str, armbian_git_repo: git.Repo, patch: PatchInPatchFile,
	bad_archeology_hexshas: list[str], fast: bool) -> bool:
	log.info(f"Trying to recover description for {patch.parent.file_name}:{patch.counter}")
	file_name_for_search = f"{patch.parent.file_name_no_ext_no_dirs}.patch"

	patch_file_paths: list[str] = []
	if fast:
		patch_file_paths = [patch.parent.full_file_path()]
	else:
		# Find all the files in the repo with the same name as the patch file.
		# Use the UNIX find command to find all the files with the same name as the patch file.
		proc = subprocess.run(
			[
				"find", base_armbian_src_dir,
				"-name", file_name_for_search,
				"-type", "f"
			],
			cwd=base_armbian_src_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
		patch_file_paths = proc.stdout.decode("utf-8").splitlines()
	log.info(f"Found {len(patch_file_paths)} files with name {file_name_for_search}")
	all_commits: list = []
	for found_file in patch_file_paths:
		relative_file_path = os.path.relpath(found_file, base_armbian_src_dir)
		hexshas = armbian_git_repo.git.log('--pretty=%H', '--follow', '--find-copies-harder', '--', relative_file_path) \
			.split('\n')
		log.info(f"- Trying to recover description for {relative_file_path} from {len(hexshas)} commits")

		# filter out hexshas that are in the known-bad archeology list
		hexshas = [hexsha for hexsha in hexshas if hexsha.strip() != '' and hexsha not in bad_archeology_hexshas]

		log.debug(f"hexshas: {hexshas}")

		commits = [armbian_git_repo.rev_parse(c) for c in hexshas]
		all_commits.extend(commits)

	unique_commits: list[git.Commit] = []
	for commit in all_commits:
		if commit not in unique_commits:
			unique_commits.append(commit)

	unique_commits.sort(key=lambda c: c.committed_datetime)

	if len(unique_commits) == 0:
		log.warning(f"Could not find any commits for '{file_name_for_search}'.")
		return False

	main_suspect: git.Commit = unique_commits[0]
	log.info(f"- Main suspect: {main_suspect}: {main_suspect.message.rstrip()} Author: {main_suspect.author}")

	# From the main_suspect, set the subject and the author, and the dates.
	main_suspect_msg_lines = main_suspect.message.splitlines()
	# strip each line
	main_suspect_msg_lines = [line.strip() for line in main_suspect_msg_lines]
	# remove empty lines
	main_suspect_msg_lines = [line for line in main_suspect_msg_lines if line != ""]
	main_suspect_subject = main_suspect_msg_lines[0].strip()
	# remove the first line, which is the subject
	suspect_desc_lines = main_suspect_msg_lines[1:]

	# Now, create a list for all other non-main suspects.
	other_suspects_desc: list[str] = []
	other_suspects_desc.extend(
		[f"> recovered message: > {suspect_desc_line}" for suspect_desc_line in suspect_desc_lines])
	other_suspects_desc.extend("")
	for commit in unique_commits:
		subject = commit.message.splitlines()[0].strip()
		rfc822_date = commit.committed_datetime.strftime("%a, %d %b %Y %H:%M:%S %z")
		other_suspects_desc.extend([
			f"- Revision {commit.hexsha}: https://github.com/armbian/build/commit/{commit.hexsha}",
			f"  Date: {rfc822_date}",
			f"  From: {commit.author.name} <{commit.author.email}>",
			f"  Subject: {subject}",
			""
		])

	patch.desc = downgrade_to_ascii("\n".join([f"> X-Git-Archeology: {line}" for line in other_suspects_desc]))

	if patch.subject is None:
		patch.subject = downgrade_to_ascii("[ARCHEOLOGY] " + main_suspect_subject)
	if patch.date is None:
		patch.date = main_suspect.committed_datetime
	if patch.from_name is None or patch.from_email is None:
		patch.from_name, patch.from_email = downgrade_to_ascii(
			main_suspect.author.name), main_suspect.author.email
	return True
